Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to Kafka Source #3118

Merged
merged 11 commits into from
Aug 10, 2023
Merged

Conversation

kkondaka
Copy link
Collaborator

@kkondaka kkondaka commented Aug 5, 2023

Description

Add metrics to Kafka.

  1. Each thread maintains a local copy of counters which are put in a topic level common data structure every 60 seconds.
  2. These metrics are aggregated across different threads of a topic, every 60 seconds and are exported to PluginMetrics

Resolves #3112

Issues Resolved

#3112

Check List

  • New functionality includes testing.
  • New functionality has been documented.
    • New functionality has javadoc added
  • [ X] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Krishna Kondaka <[email protected]>
Krishna Kondaka added 3 commits August 5, 2023 16:13
Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
cmetrics = consumerMetricsMap.get(consumer);
}
if (cmetrics == null) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these be assertions? or at least an error message. unless there is a legitimate case

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not happen. So assertion would be better but I don't see any assertions in the code.

Metric value = entry.getValue();
String metricName = metric.name();
String metricGroup = metric.group();
if ((metricName.contains("consumed")) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we use a map to filter-in/out metrics of interest.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the names exist under two categories, so map may not be straight-forward. Let me check again.

return camelCaseName;
}

public void update(final KafkaConsumer consumer, final String metricName, Integer metricValue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: function name can be renamed to something that differentiates functionalities of both functions.

}
synchronized (consumerMetricsMap) {
long curTime = Instant.now().getEpochSecond();
if (curTime - updateTime > metricUpdateInterval) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is done in caller function as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's done at two layers. One to update the global data structure and the other to do the aggregation.

});
});
aggregatedMetrics.forEach((name, value) -> {
pluginMetrics.gauge("topic."+topicName+"."+getCamelCaseName(name), value);
Copy link
Contributor

@hshardeesi hshardeesi Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are aggregating for a topic then should this function be called once per topic with list of consumers? may be call this from kafka source thread for each topic..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is called for list of consumers, but indirectly.

cmetrics.put(metricName, value.metricValue());
}
}
synchronized (consumerMetricsMap) {
Copy link
Contributor

@hshardeesi hshardeesi Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this aggregation can be defined in a separate function.

Krishna Kondaka added 2 commits August 8, 2023 15:02
Signed-off-by: Krishna Kondaka <[email protected]>
});
});
aggregatedMetrics.forEach((name, value) -> {
pluginMetrics.gauge("topic."+topicName+"."+getCamelCaseName(name), value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A gauge should be setup once and then updated over time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed this offline.

e.topicPartition().topic(), e.topicPartition().partition(), e.offset());
numberOfDeserializationErrors++;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we just increment a Micrometer counter?

@@ -216,15 +258,34 @@ Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit() {
return offsetsToCommit;
}

public void updateMetrics() {
long curTime = Instant.now().getEpochSecond();
if (curTime - metricsUpdatedTime >= metricsUpdateInterval) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to track the metrics updates. Micrometer handles this for us. If you provide a DoubleSupplier to a gauge, then Micrometer can get what it needs at that point in time.

camelCaseMap.put("join-rate", "joinRate");
camelCaseMap.put("incoming-byte-rate", "incomingByteRate");
camelCaseMap.put("outgoing-byte-rate", "outgoingByteRate");
camelCaseMap.put("assigned-partitions", "outgoingByteRate");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value should be assignedPartitions. Maybe a copy-paste error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't need this. Thanks for finding this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it.

@@ -37,6 +37,7 @@ public boolean getInsecure() {
}

public static final Duration DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT = Duration.ofSeconds(30);
public static final Duration DEFAULT_METRICS_UPDATE_INTERVAL = Duration.ofSeconds(60);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this is unused and may be left over from the previous commits.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it.

Krishna Kondaka added 2 commits August 9, 2023 16:39
Signed-off-by: Krishna Kondaka <[email protected]>
return numberOfPositiveAcknowledgements;
}

public String getTopicMetricName(final String metricName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be private.

metricsNameMap.put("outgoing-byte-rate", "outgoingByteRate");
metricsNameMap.forEach((metricName, camelCaseName) -> {
if (!metricName.contains("-total")) {
pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code should be unit tested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it might make sense to invert the if-else with the gauge to clarify the behavior some. The condition is really on the metric function, not inside it.

Something like:

if(metricName.equals("records-lag-max") {
  pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> {
    double max = 0.0;
    for (Map.Entry<KafkaConsumer, Map<String, Double>> entry : metricValues.entrySet()) {
    if (entry.getValue().get(metricName) > max) {
        max = entry.getValue().get(metricName);
      }
    }
    return max;
  });
else if(...)
  pluginMetrics.gauge(...)
}

metricsNameMap.forEach((metricName, camelCaseName) -> {
if (!metricName.contains("-total")) {
pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> {
synchronized(metricValues) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to synchronize on metricValues? Won't this collection stay the same?

Might we synchronize on the individual entries?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's a good idea. Will try synchronize on individual entries.

double newValue = (Double)value.metricValue();
if (metricName.equals("records-consumed-total")) {
double prevValue = consumerMetrics.get(metricName);
numberOfRecordsConsumed.increment(newValue - prevValue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than tracking the previous value, do you think we could use the metric's built-in time feature?

https://kafka.apache.org/27/javadoc/org/apache/kafka/common/metrics/Measurable.html#measure-org.apache.kafka.common.metrics.MetricConfig-long-

Thus:

previousValue = ???.measure(???, lastTime);

It's not critical, but might be something to consider. Perhaps for a follow-on.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this later.


@ParameterizedTest
@ValueSource(ints = {1, 5, 10})
//@ValueSource(ints = {2})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this comment. I'm fine doing this now or in a follow-on PR.

@kkondaka kkondaka merged commit 44e2eaf into opensearch-project:main Aug 10, 2023
24 checks passed
@kkondaka kkondaka deleted the kafka-metrics branch May 13, 2024 05:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Metrics in Kafka Source
3 participants